-
Notifications
You must be signed in to change notification settings - Fork 50
Conversation
lib/src/condition_variable.dart
Outdated
} | ||
return _completer.future; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is equivalent to a broadcast Stream<void>
where you can only wait for the next event. An implementation could be:
class Notifier {
final _control = StreamController<void>.broadcast();
void notify() { _control.add(null); }
Future<void> get wait => _control.stream.first;
}
(Probably more expensive, though.)
Future<void> parallelForEach( | ||
int maxParallel, | ||
FutureOr<void> Function(T item) each, { | ||
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to:
FutureOr<bool> Function(Exception e) continueIf
On the assumption that:
- You should never ignore an
Error
(I could also be convince to be less opinionated). - You should never consider the stack trace when deciding to continue.
But I could use help choosing a better name? breakIf
or breakOnError
or ignoreErrorIf
or filterError
or errorIf
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not open to special-casing Exception
. Especially not the raw Exception
type.
If the user knows that each
can throw a specific subtype of Exception
, then each
should be made to handle that, rather than failing and having a second function handle it (but not break iteration?).
I'd just not have this parameter at all.
And I'd make the first each
to throw cancel the entire operation (probably still need to wait for the rest of the pending futures to complete, but I'd cancel the stream subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Notifier is neat. A simple primitive that can occasionally be useful.
The parallelForEach
feels more specialized, and the error handling is not consistent with other similar APIs.
The arguments look like they apply to stream events, but the onError
only applies to the result of each
, which shouldn't be necessary (if the user wants to catch errors from each
, they just wrap the function with error handling. They know better what to handle and do.)
import 'package:meta/meta.dart'; | ||
|
||
/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to | ||
/// [notify]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't mention the microtask!
(Also, makes no sense. If a microtask is waiting for something, it's no longer the same microtask.)
So:
/// An asynchronous synchonization primitive.
///
/// Any number of asynchronous computations can
/// wait on the [wait] future, which will be
/// completed when someone calls [notify()].
/// After that, further reads of [wait] will provide
/// a new future which is then completed by the next
/// following call to [notify()].
// TODO: Apply `final` when language version for this library is bumped to 3.0 | ||
@sealed | ||
class Notifier { | ||
var _completer = Completer<void>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd make this nullable
, setting it to non-null
on the first wait
, and setting it back to null
on notify
.
That way, if the first call is notify
, you're not wasting a completer.
} | ||
} | ||
|
||
/// Wait for notification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noun phrase for getters.
Maybe call it notification
, since wait
is also a verb.
But long, wait
is short.
Maybe next
var tick = Notifier();
....
tick.notify();
...
do {
await tick.next;
} while (result != "success");
``
|
||
/// Wait for notification. | ||
/// | ||
/// Returns a [Future] that will complete the next time [notify] is called. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use "Returns" about properties.
/// The `wait` [Future] is completed the next time [notify] is called.
}) async { | ||
// Track the first error, so we rethrow when we're done. | ||
Object? firstError; | ||
StackTrace? firstStackTrace; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd store this as (Object, StackTrace)?
now, so we won't need to do if (firstError != null) ... (firstError, firstStackTrace!)
.
Future<void> parallelForEach( | ||
int maxParallel, | ||
FutureOr<void> Function(T item) each, { | ||
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not open to special-casing Exception
. Especially not the raw Exception
type.
If the user knows that each
can throw a specific subtype of Exception
, then each
should be made to handle that, rather than failing and having a second function handle it (but not break iteration?).
I'd just not have this parameter at all.
And I'd make the first each
to throw cancel the entire operation (probably still need to wait for the rest of the pending futures to complete, but I'd cancel the stream subscription.
/// }); | ||
/// print('Folder size: $folderSize'); | ||
/// ``` | ||
Future<void> parallelForEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could return a Stream<void>
, if you want to know how many operations have completed, and see all their errors.
Or make it:
Stream<R> parallelMap<R>(int maxParallel, FutureOr<R> mapData(T), {FutureOr<R> Function(Object error, StackTrace stack)? mapError});
which just calls mapData
on each element and mapError
on each error (or forwards it unchanged if omitted), and does not guarantee preserving order.
|
||
try { | ||
var doBreak = false; | ||
await for (final item in this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This throws if the stream contains an error. User might expect that their onError
would handle it.
(If anything, I'd expect the onError
argument to apply to these errors, not errors created by each
. The callbacks are provide at the same level, so one of them should not apply to the result of the other.)
} finally { | ||
// When [each] is done, we decrement [running] and notify | ||
running -= 1; | ||
itemDone.notify(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't really need notify
for this.
Future<void> parallelForEach(
int maxParallel,
FutureOr<void> Function(T item) each) {
var pending = 0;
var done = false;
var result = Completer<void>.sync();
(Object, StackTrace)? firstError;
void complete() {
assert(pending == 0);
assert(done);
if (firstError case (var error, var stack)) {
result.completeError(error, stack);
} else {
result.complete(null);
}
}
var subscription = stream.listen(null, onDone: () {
done = true;
if (pending == 0) {
complete();
}
});
subscription
..onError((Object error, StackTrace stack) {
subscription.cancel().ignore();
done = true;
firstError = (error, stack); // Takes precedence over user errors.
if (pending == 0) complete();
})
..onData((T value) {
try {
var computation = each(value);
if (computation is Future) {
if (++pending >= maxParallel) subscription.pause();
computation.then((_) {
if (--pending == 0 && done) complete();
subscription.resume();
}, onError: (Object error, StackTrace stack) {
subscription.cancel().ignore();
done = true;
firstError ??= (error, stack);
if (--pending == 0) complete();
});
} catch (error, stack) {
subscription.cancel().ignore();
done = true;
firstError ??= (error, stack);
if (pending == 0) complete();
}
});
return result.future;
}
Closing as the dart-lang/async repository is merged into the dart-lang/core monorepo. Please re-open this PR there! |
Adding:
Notifier
withwait
that'll be resolved whennotify()
is called. This is a primitive for signaling.Stream.parallelForEach(maxParallel, eachFn, onError)
for callingeachFn
in parallel for each item in the stream.It's possible that
onError
really should be calledignoreError
and only takeObject e
or evenException e
. Since:Or maybe we should call it
continueIf
, because it really makes sense to write:continueIf: (Exception e) => e is TimeoutException
and things like that.Having an
onError
method where you can choose to rethrow or not is really bad. BecauseError.throwWithStackTrace
is annoying to call, so people are likely to loose their stack traces.